12.5.11 並行処理ツールの協調
ゴルーチン、チャネル、select文を使ってコードを構造化することで、個々のステップを分離し、独立した部分を任意の順序で実行して完了できるようにし、依存部分間でデータをきれいに交換する。さらに、プログラムのどの部分もブロックしないようにして、この関数内と呼び出し済の関数の両方で設定されたタイムアウトを適切に処理する
code:go
// main.go
package main
import (
"context"
"fmt"
"log"
"os"
"time"
)
// -------------------- 構造化並行性の基盤部分 --------------------
// contextを用いてタイムアウトとキャンセルを上位から下位へ伝播させる
// → 「呼び出し階層全体にわたる制御フロー」
// --------------------------------------------------------------
func GatherAndProcess(ctx context.Context, data Input) (COut, error) {
ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancel() // ← 確実にキャンセル(リソース解放)
// -------------------- 並行実行の構成 --------------------
// abProcessor:AとBを独立したタスクとして並行実行
// → 「各ステップを独立したタスクとして安全に並行実行」
// ---------------------------------------------------------
ab := newABProcessor()
ab.start(ctx, data)
inputC, err := ab.wait(ctx)
if err != nil {
return COut{}, err
}
// -------------------- パイプラインの次段階 --------------------
// CProcessor:A/Bの結果を受け取り次の処理へ
// チャネルでデータを渡すことで「依存データを明確に交換」
// ------------------------------------------------------------
c := newCProcessor()
c.start(ctx, inputC)
out, err := c.wait(ctx)
return out, err
}
// main関数:最上位。context.Background() を起点にパイプライン全体を制御。
func main() {
if len(os.Args) < 3 {
fmt.Println("expected input to be processed")
os.Exit(1)
}
cout, err := GatherAndProcess(context.Background(), Input{
A: os.Args1,
B: os.Args2,
})
if err != nil {
log.Fatal(err)
}
fmt.Println(cout)
}
// ABProcessor.go
package main
import "context"
// AとBを並行実行するための構造体
type abProcessor struct {
outA chan aOut
outB chan bOut
errs chan error
}
// チャネルを用意して明示的なデータフローを構成
// → 「チャネルを介して依存データを明確にやり取り」
func newABProcessor() *abProcessor {
return &abProcessor{
outA: make(chan aOut, 1),
outB: make(chan bOut, 1),
errs: make(chan error, 2),
}
}
// -------------------- 並行処理の開始 --------------------
// 「ゴルーチンを使って各ステップを独立したタスクとして並行実行」
func (p *abProcessor) start(ctx context.Context, data Input) {
go func() {
aOut, err := getResultA(ctx, data.A)
if err != nil {
p.errs <- err
return
}
p.outA <- aOut
}()
go func() {
bOut, err := getResultB(ctx, data.B)
if err != nil {
p.errs <- err
return
}
p.outB <- bOut
}()
}
// -------------------- 並行タスクの協調 --------------------
// select によって結果・エラー・キャンセルを同時に監視
// → 「selectによる協調的監視」
// → 「ハングアップを防ぐ」
// ----------------------------------------------------------
func (p *abProcessor) wait(ctx context.Context) (cIn, error) {
var cData cIn
for count := 0; count < 2; count++ {
select {
case a := <-p.outA: // Aの結果受信
cData.a = a
case b := <-p.outB: // Bの結果受信
cData.b = b
case err := <-p.errs: // エラー即時返却
return cIn{}, err
case <-ctx.Done(): // タイムアウトまたはキャンセル
return cIn{}, ctx.Err()
}
}
return cData, nil
}
// CProcessor.go
package main
import "context"
// CProcessor:ABの結果を統合する段階
// → 「チャネルによる依存関係の明示とパイプライン構造」
type cProcessor struct {
outC chan COut
errs chan error
}
func newCProcessor() *cProcessor {
return &cProcessor{
outC: make(chan COut, 1),
errs: make(chan error, 1),
}
}
// -------------------- 並行タスク起動 --------------------
// ABの出力を受け取り、次の処理を独立したゴルーチンで実行
// → 「ステップを独立させつつデータ依存を維持」
func (p *cProcessor) start(ctx context.Context, inputC cIn) {
go func() {
cOut, err := getResultC(ctx, inputC)
if err != nil {
p.errs <- err
return
}
p.outC <- cOut
}()
}
// -------------------- 協調的終了処理 --------------------
// selectで結果・エラー・キャンセルを同時監視
// → 「ハングアップ防止」+「contextの伝播」
func (p *cProcessor) wait(ctx context.Context) (COut, error) {
select {
case out := <-p.outC:
return out, nil
case err := <-p.errs:
return COut{}, err
case <-ctx.Done():
return COut{}, ctx.Err()
}
}
code:mermaid
sequenceDiagram
autonumber
participant Main as 🧭 main()
participant GAP as ⚙️ GatherAndProcess()
participant AB as 🧩 abProcessor
participant A as 🔵 getResultA()
participant B as 🟢 getResultB()
participant C as 🟣 cProcessor
participant GC as 🧠 getResultC()
participant CTX as ⏳ context(WithTimeout)
Note over Main,CTX: context.WithTimeout(ctx, 50ms)
Main->>GAP: 呼び出し(Input{A,B})
activate GAP
GAP->>CTX: タイムアウト付きcontext生成
CTX-->>GAP: ctx(50ms制限)
Note over GAP,AB: abProcessorの起動
GAP->>AB: newABProcessor()
GAP->>AB: start(ctx, Input{A,B})
activate AB
par 並行ゴルーチンA
AB->>A: getResultA(ctx, data.A)
A-->>AB: 結果 aOut または エラー
AB-->>AB: outA または errs に送信
and 並行ゴルーチンB
AB->>B: getResultB(ctx, data.B)
B-->>AB: 結果 bOut または エラー
AB-->>AB: outB または errs に送信
end
Note over AB: selectでA/B結果・エラー・ctx.Done()を監視
AB-->>GAP: 両結果 (cIn) またはエラー/キャンセル
deactivate AB
Note over GAP,C: cProcessorの起動
GAP->>C: newCProcessor()
GAP->>C: start(ctx, cIn)
activate C
C->>GC: getResultC(ctx, cIn)
GC-->>C: 結果 COut または エラー
C-->>GAP: outC または errs チャネル経由で返す
deactivate C
GAP-->>Main: COut または エラー
deactivate GAP
Note over CTX: タイムアウト50ms経過時に ctx.Done() を全goroutineへ通知